package com.stay4it.rxjava;

import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;

/**
 * RxJava Observer与Subscriber的关系
 * 参考链接：http://blog.csdn.net/jdsjlzx/article/details/51534504
 * 
 * public interface Subscription {
 *    void unsubscribe();
 *    boolean isUnsubscribed();
 * }
 * 
 * public abstract class Subscriber<T> implements Observer<T>, Subscription
 * 
 * Subscriber实现了Subscription接口，所以有取消订阅（unsubscribe）和查询是否取消订阅（isUnsubscribed）的方法。
 * Observer则没有上面两个方法。
 * 
 * Observer与Subscriber的主要区别在于onCompleted()方法执行完毕后是否取消了订阅。
 * 
 */
public class RxJava14 {

	/**
	 * 
	 * public final Subscription subscribe(Subscriber<? super T> subscriber) {
     *  return Observable.subscribe(subscriber, this);
     * }
	 * 
	 */
	private static void test1() {
		Observable observable = Observable.just("Hello");
		Subscriber subscriber = new Subscriber<String>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }
            @Override
            public void onError(Throwable e) {
            	System.out.println("onError");
            }
            @Override
            public void onNext(String value) {
            	System.out.println("onNext value :"+ value);
            }
        };
		
		Subscription subscription = observable.subscribe(subscriber);
		
		System.out.println("1.subscription.isUnsubscribed() ? " + subscription.isUnsubscribed());
		subscription.unsubscribe();
		System.out.println("2. subscription.isUnsubscribed() ? " + subscription.isUnsubscribed());
		
		System.out.println("----------------");
		
		observable.subscribe(subscriber);//subscriber已经取消了订阅他的Observable，重新订阅旧的subscriber（观察者）是无效的
		
		System.out.println("*****************");
		
		Observable.just("Hello,world").subscribe(subscriber);//subscriber已经取消了订阅关系
	}
	
	private static void test2() {
		Observable observable = Observable.just("Hello");
		Observer observer = new Observer<String>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }
            @Override
            public void onError(Throwable e) {
            	System.out.println("onError");
            }
            @Override
            public void onNext(String value) {
            	System.out.println("onNext value :"+ value);
            }
        };
		
		Subscription subscription = observable.subscribe(observer);
		
		System.out.println("1.subscription.isUnsubscribed() ? " + subscription.isUnsubscribed());
		subscription.unsubscribe();
		System.out.println("2. subscription.isUnsubscribed() ? " + subscription.isUnsubscribed());
		
		System.out.println("----------------");
		
		observable.subscribe(observer);//重新建立订阅关系
		
		System.out.println("*****************");
		
		Observable.just("Hello,world").subscribe(observer);//重新建立订阅关系
	}
	
	/**
	 * subscribe(Observer) 和 subscribe(Subscriber) ，subscribe() 还支持不完整定义的回调，RxJava会自动根据定义创建出Subscriber.
	 * 了解接口：Action1、Action0
	 */
	private static void testAction(){
		
		//处理onNext()中的内容
	      Action1<String> onNextAction = new Action1<String>() {
	          @Override
	          public void call(String s) {
	        	  System.out.println("call value = " + s);		
	          }
	      };
	      //处理onError()中的内容
	      Action1<Throwable> onErrorAction = new Action1<Throwable>() {
	          @Override
	          public void call(Throwable throwable) {
	        	  System.out.println("call throwable = " + throwable);		
	          }
	      };
	      //处理onCompleted()中的内容
	      Action0 onCompletedAction = new Action0() {
	          @Override
	          public void call() {
	        	  System.out.println("call onCompleted");		

	          }
	      };
	      
	      Observable observable = Observable.just("Hello", "World");
		
	      // 自动创建 Subscriber ，并使用 onNextAction 来定义 onNext()
	      observable.subscribe(onNextAction);
	      // 自动创建 Subscriber ，并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
	      observable.subscribe(onNextAction, onErrorAction);
	      // 自动创建 Subscriber ，并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
	      observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
	}
	
	public static void main(String[] args) throws InterruptedException {
		test1();

	}

}
